import itertools
import os
import random
import warnings
from collections import Counter
from pathlib import Path
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import xarray as xr
from sklearn.metrics import mean_squared_error
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.tsa.stattools import adfuller3 SARIMA Model
The Seasonal Autoregressive Integrated Moving Average (SARIMA) model is a robust and widely used tool in time series forecasting. SARIMA extends the ARIMA model by incorporating seasonal components, making it suitable for data with seasonal patterns. The model includes parameters for autoregression (p), differencing (d), and moving average (q), along with their seasonal counterparts (P, D, Q). These parameters are crucial for capturing both the trend and seasonal variations in the data, thus providing more accurate predictions for time series that exhibit seasonality [Sim et al. (2022)](Azad et al. 2022). The SARIMA model’s ability to handle non-stationary data through differencing and its inclusion of seasonal parameters make it highly effective for analyzing complex time series data.
SARIMA(p,d,q)(P,D,Q)(S) with: - p: number of lag observations included in the model - d: number of times the raw observations are differenced to make the time series stationary - q: number of lagged forecast errors in the prediction equation - P: number of lag observations in the seasonal part of the model - D: number of times the observations are differenced over the seasonal period to make the series stationary - Q: number of lagged forecast errors in the seasonal part of the model - S: length of the seasonal cycle
3.1 Description and Overview
1. Load Packages
2. Define base directory
3. Grid Search for Cube 655 (most complete cube) - Stationarity Check: Augmented Dickey Fuller Test - Differencing for non-stationary pixels - Grid search for p,q,P and Q to define most common parameter combination - Get most common parameter combination for cube 665
4. Fit SARIMA model and make predictions - Stationarity Check: Augmented Dickey Fuller Test - Differencing for non-stationary pixels - Fit SARIMA model to each pixel in the four most complete cubes - Calculate training and test mean squared error (mse) and best akaike information criterion (aic) - Save fitted and predicted NDVI values to seperate .nc-files
3.2 1. Load packages
First we import the necessary packages.
3.3 2. Define base directory
# Define base_dir for consistent path management
notebook_dir = Path(os.getcwd()).resolve()
base_dir = notebook_dir.parent
print(base_dir)/home/cgoehler/team-extra/ndvi-time-series-prediction
3.4 3. Grid Search for Cube 655 (most complete cube)
Selecting the best parameter combinations for a SARIMA model involves a systematic approach. Initially, the data must be checked for stationarity using tests like the Augmented Dickey-Fuller (ADF) test. Once stationarity is confirmed, a grid search is applied in order to identify suitable values for p, q, P and Q parameters. Due to limited computing power we apply the grid search only for 30 random pixels in the most complete cube (Cube 665).
The final model is selected based on the Akaike Information Criterion (AIC), which helps in identifying the model with the best fit by minimizing this value (Sim et al. 2022). The Training Mean Squared Error (MSE) is used to validate the effectiveness of the model, by minimizing it. This systematic approach ensures that the chosen SARIMA model effectively captures the underlying patterns in the NDVI time series data.
Therefore we follow those steps: - Iterate over each of the 30 random pixel in the cube, ignoring pixels that only contain NaNs 1. Apply an Augmented Dickey Fuller Test & Differencing for d (0 or 1), D = 1 in order to check for stationarity and apply differencing 2. Grid Search for p, q, P & Q by applying different parameter combination to the SARIMA model - Get most common parameter combination for the 30 random pixel in cube 665
# Suppress warnings
warnings.filterwarnings("ignore")
# Function to perform the Augmented Dickey-Fuller test
def adf_test(time_series):
"""
Perform the Augmented Dickey-Fuller test.
Parameters:
time_series (np.ndarray): The time series data to be tested.
Returns:
float: The ADF test statistic.
float: The p-value of the test.
"""
result = adfuller(time_series)
adf_statistic = result[0]
p_value = result[1]
return adf_statistic, p_value
# Function to determine differencing orders
def determine_differencing(time_series, seasonal_period):
"""
Determine the differencing orders for a given time series.
Parameters:
time_series (xarray.DataArray): The time series data.
seasonal_period (int): The seasonal period for the time series.
Returns:
int: Order of non-seasonal differencing (d).
int: Order of seasonal differencing (D).
"""
time_series_values = time_series.values.flatten()
if len(time_series_values) < 2:
raise ValueError("Time series must have more than one data point.")
adf_statistic, p_value = adf_test(time_series_values)
d = 1 if p_value >= 0.05 else 0
D = 1
return d, D
# Function to fit SARIMAX model and return fitted model, training MSE, and AIC
def fit_sarimax_model(time_series, order, seasonal_order):
"""
Fit a SARIMAX model and return the fitted model, training MSE, and AIC.
Parameters:
time_series (np.ndarray): The time series data to fit.
order (tuple): The order of the SARIMAX model (p, d, q).
seasonal_order (tuple): The seasonal order of the SARIMAX model (P, D, Q, s).
Returns:
SARIMAXResults: The fitted SARIMAX model.
float: The mean squared error of the training data.
float: The AIC of the fitted model.
"""
model = SARIMAX(
time_series,
order=order,
seasonal_order=seasonal_order,
enforce_stationarity=False,
enforce_invertibility=False,
)
results = model.fit(disp=False)
fitted_values = results.fittedvalues
training_mse = mean_squared_error(time_series, fitted_values)
aic = results.aic
return results, training_mse, aic
# Function to perform grid search
def grid_search_sarimax(
time_series, p_range, d_range, q_range, P_range, D_range, Q_range, S_range
):
"""
Perform a grid search to find the best SARIMAX model parameters.
Parameters:
time_series (np.ndarray): The time series data to fit.
p_range (range): Range of p values to consider.
d_range (range): Range of d values to consider.
q_range (range): Range of q values to consider.
P_range (range): Range of P values to consider.
D_range (range): Range of D values to consider.
Q_range (range): Range of Q values to consider.
S_range (list): List of seasonal periods to consider.
Returns:
SARIMAXResults: The best fitted SARIMAX model.
tuple: The best parameter combination.
float: The mean squared error of the best model.
float: The AIC of the best model.
"""
best_aic = float("inf")
best_mse = float("inf")
best_params = None
best_model = None
for p, d, q, P, D, Q, S in itertools.product(
p_range, d_range, q_range, P_range, D_range, Q_range, S_range
):
try:
order = (p, d, q)
seasonal_order = (P, D, Q, S)
model, training_mse, aic = fit_sarimax_model(
time_series, order, seasonal_order
)
if aic < best_aic:
best_aic = aic
best_mse = training_mse
best_params = (order, seasonal_order)
best_model = model
except Exception as e:
print(f"Error with parameters {order} and {seasonal_order}: {e}")
continue
return best_model, best_params, best_mse, best_aic
# Define the seasonal period
seasonal_period = 73
# Define parameter ranges for grid search
p_range = range(0, 2)
q_range = range(0, 2)
P_range = range(0, 2)
Q_range = range(0, 2)
S_range = [seasonal_period]
# Generate 30 random pixel coordinates
grid_size = 128
random_pixel_pairs = [
(random.randint(0, grid_size - 1), random.randint(0, grid_size - 1))
for _ in range(30)
]
# Specific file for Cube_665
data_folder_train = base_dir / "data" / "data_train"
data_folder_test = base_dir / "data" / "data_test"
nc_file_train = "ds_B_Cube_665_train.nc"
nc_file_test = "Cube_665_test.nc"
# Open the specific dataset
ds_train = xr.open_dataset(os.path.join(data_folder_train, nc_file_train))
ndvi_data_train = ds_train["NDVI"]
ds_test = xr.open_dataset(os.path.join(data_folder_test, nc_file_test))
ndvi_data_test = ds_test["NDVI"]
# Initialize arrays to store predictions and MSEs
predictions = np.full((30, 93), np.nan)
train_mse_array = np.full(30, np.nan)
test_mse_array = np.full(30, np.nan)
# List to store parameter combinations
parameter_combinations = []
# Iterate over the selected pixels
for idx, (x, y) in enumerate(random_pixel_pairs):
try:
ndvi_pixel_train = ndvi_data_train.isel(x=x, y=y)
if np.all(np.isnan(ndvi_pixel_train)):
continue # Skip pixels with only NaNs
d, D = determine_differencing(ndvi_pixel_train, seasonal_period)
d_range = [d]
D_range = [D]
best_model, best_params, best_mse, best_aic = grid_search_sarimax(
ndvi_pixel_train.values.flatten(),
p_range,
d_range,
q_range,
P_range,
D_range,
Q_range,
S_range,
)
# Append the best parameter combination
parameter_combinations.append(best_params)
# Forecast
forecast_steps = 93 # Fixed forecast steps to 93
forecast = best_model.get_forecast(steps=forecast_steps).predicted_mean
# Store predictions
predictions[idx, :] = forecast
# Calculate training MSE
train_mse_array[idx] = best_mse
# Test data for the same pixel
ndvi_pixel_test = ndvi_data_test.isel(x=x, y=y)
test_time_index = pd.date_range(
start="2021-07-03", periods=len(ndvi_pixel_test.time), freq="5D"
)
test_series = pd.Series(ndvi_pixel_test.values.flatten(), index=test_time_index)
# Create a time index for the forecast data
original_time_index = pd.date_range(
start="2017-07-04", periods=len(ndvi_pixel_train.time), freq="5D"
)
forecast_time_index = pd.date_range(
start=original_time_index[-1] + pd.DateOffset(days=5),
periods=forecast_steps,
freq="5D",
)
forecast_series = pd.Series(forecast, index=forecast_time_index)
# Ensure the forecast and test data have the same time dimension
common_indices = forecast_series.index.intersection(test_series.index)
# Subset forecast and test series to only include the common indices
aligned_forecast_series = forecast_series.loc[common_indices]
aligned_test_series = test_series.loc[common_indices]
# Drop NaNs from both series
aligned_forecast_series = aligned_forecast_series.dropna()
aligned_test_series = aligned_test_series.dropna()
# Align the series again to ensure matching lengths after dropping NaNs
final_common_indices = aligned_forecast_series.index.intersection(
aligned_test_series.index
)
aligned_forecast_series = aligned_forecast_series.loc[final_common_indices]
aligned_test_series = aligned_test_series.loc[final_common_indices]
# Ensure lengths match after alignment
if len(aligned_forecast_series) != len(aligned_test_series):
raise ValueError(
"Aligned forecast and test series have different lengths after dropping NaNs"
)
# Calculate test MSE
test_mse = mean_squared_error(aligned_test_series, aligned_forecast_series)
test_mse_array[idx] = test_mse
# print(f"Pixel ({x}, {y}): Best Params: {best_params}, Best Train MSE: {best_mse}, Best AIC: {best_aic}, Test MSE: {test_mse}")
except Exception as e:
# print(f"Failed to process pixel ({x}, {y}): {e}")
continue
# Find the most common parameter combination
most_common_params = Counter(parameter_combinations).most_common(1)[0]
print(
f"The most common parameter combination is: {most_common_params[0]} with a count of {most_common_params[1]}"
)KeyboardInterrupt:
3.5 4. Fit SARIMA model and make predictions
We fit the SARIMA model to the NDVI time series of each pixel of the four most complete cubes. Predictions of the NDVI are made for the next 93 time steps (2021-07-03 to 2022-10-06). The model’s performance is evaluated based on training and test mean squared error (MSEs). We iterate over all pixel in the selected four most complete cubes (I: Cube 665, II: Cube 80, III: Cube 1203, IV: Cube 1301).
The analysis includes the following steps: - Augmented Dickey Fuller (ADF) Test is carried out to check each pixel for stationarity. The function returns the ADF test statistics and the p-value. - After that the differencing order is determined, which is required to make the time series stationary based on the ADF test restults. - The non-seasonal differencing parameter d is set to 1 (for non-stationary pixel) or 0 (for stationary pixel) - The seasonal differencing paramter D is set to 1 for all pixel. Research has shown that the value of D is typically determined by the seasonality present in the data and can be set to 1 in order to effectively capture the seasonal patterns in the NDVI series (Silva et al. 2018). This aligns with the general approach in time series analysis, where (D = 1) is often sufficient to remove seasonality, especially when the data exhibits clear and repetitive seasonal cycles. - Fit the SARIMA Model with SARIMAX((p,d,q)(P,D,Q)(S)) - The SARIMAX() function of the statsmodels package fits a pixelwise SARIMA model to the NDVI time series of each pixel and returns the fitted model, training MSE and AIC - We predict the next 93 time steps (from 2021-07-03 to 2022-10-06) of the NDVI time series for each pixel in each of the cubes - Parameter combination of p, q, P and Q are predefined: p, q, P, Q = 1, 0, 0, 0 as this was the most common parameter combination in the grid search for the 30 random pixel in the most complete cube 665 - Explanation of the different parameters: - p=1: the model uses value of the previous time step to predict the current value - d=0: no differencing is applied (the time series is stationary), d=1 differencing is applied (the time series is non-stationary) - q=0: the model uses no previous forecast error to predict the current value - P=0: no seasonal autoregressive terms are used - D=1: the model includes one seasonal differencing component - Q=0: no seasonal moving average terms are used - S=73: in our data with a frequence of 5 days we have 73 time steps a year - The training MSE (between training and fitted data), the best training AIC and the test MSE (between predicted and test data) are calculated - The fitted values together with the training MSE and best training AIC are saved in a separate netcdf file ('/data/data_fitted/SARIMA_fitted_Cube_XY.nc') per cube - The predicted values together with the test MSE are saved in a separate netcdf file ('/data/data_predictions/SARIMA_predicted_Cube_XY.nc') per cube
# Suppress warnings
warnings.filterwarnings("ignore")
# definition of functions for Augmented Dickey Fuller Test, Differencing, Fitting SARIMAX model
def adf_test(time_series):
"""
Perform the Augmented Dickey-Fuller test.
Parameters:
time_series (np.ndarray): The time series data to be tested.
Returns:
float: The ADF test statistic.
float: The p-value of the test.
"""
result = adfuller(time_series)
adf_statistic = result[0]
p_value = result[1]
return adf_statistic, p_value
def determine_differencing(time_series, seasonal_period):
"""
Determine the differencing orders for a given time series.
Parameters:
time_series (xarray.DataArray): The time series data.
seasonal_period (int): The seasonal period for the time series.
Returns:
int: Order of non-seasonal differencing (d).
int: Order of seasonal differencing (D).
"""
time_series_values = time_series.values.flatten()
if len(time_series_values) < 2:
raise ValueError("Time series must have more than one data point.")
adf_statistic, p_value = adf_test(time_series_values)
if p_value >= 0.05:
d = 1
else:
d = 0
D = 1
return d, D
def fit_sarimax_model(time_series, order, seasonal_order):
"""
Fit a SARIMAX model and return the fitted model, training MSE, and AIC.
Parameters:
time_series (np.ndarray): The time series data to fit.
order (tuple): The order of the SARIMAX model (p, d, q).
seasonal_order (tuple): The seasonal order of the SARIMAX model (P, D, Q, s).
Returns:
SARIMAXResults: The fitted SARIMAX model.
float: The mean squared error of the training data.
float: The AIC of the fitted model.
"""
model = SARIMAX(
time_series,
order=order,
seasonal_order=seasonal_order,
enforce_stationarity=False,
enforce_invertibility=False,
)
results = model.fit(disp=False)
fitted_values = results.fittedvalues
training_mse = mean_squared_error(time_series, fitted_values)
aic = results.aic
return results, training_mse, aic
# Define the seasonal period
seasonal_period = 73
# Define fixed SARIMAX parameters
p, q, P, Q = 1, 0, 0, 0
# Folder with 100 NetCDF datasets
data_folder_train = base_dir / "data" / "data_train"
data_folder_test = base_dir / "data" / "data_test"
nc_files_train = [f for f in os.listdir(data_folder_train) if f.endswith(".nc")]
nc_files_test = [f for f in os.listdir(data_folder_test) if f.endswith(".nc")]
# Ensure that the test files correspond to the training files
if len(nc_files_train) != len(nc_files_test):
raise ValueError(
"The number of training files does not match the number of test files"
)
# Iterate over all NetCDF files
for nc_file_train, nc_file_test in zip(nc_files_train, nc_files_test):
print(f"Processing file {nc_file_train}")
ds_train = xr.open_dataset(os.path.join(data_folder_train, nc_file_train))
ndvi_data_train = ds_train["NDVI"]
ds_test = xr.open_dataset(os.path.join(data_folder_test, nc_file_test))
ndvi_data_test = ds_test["NDVI"]
# Initialize arrays to store predictions, fitted values, MSEs, and AICs
predictions = np.full(
(ndvi_data_test.sizes["x"], ndvi_data_test.sizes["y"], 93), np.nan
)
fitted_values_array = np.full(
(
ndvi_data_train.sizes["x"],
ndvi_data_train.sizes["y"],
len(ndvi_data_train.time),
),
np.nan,
)
train_mse_array = np.full(
(ndvi_data_train.sizes["x"], ndvi_data_train.sizes["y"]), np.nan
)
train_aic_array = np.full(
(ndvi_data_train.sizes["x"], ndvi_data_train.sizes["y"]), np.nan
)
test_mse_array = np.full(
(ndvi_data_train.sizes["x"], ndvi_data_train.sizes["y"]), np.nan
)
d_array = np.full((ndvi_data_train.sizes["x"], ndvi_data_train.sizes["y"]), np.nan)
# Iterate over all pixels in the dataset
for x in range(ndvi_data_train.sizes["x"]):
for y in range(ndvi_data_train.sizes["y"]):
try:
ndvi_pixel_train = ndvi_data_train.isel(x=x, y=y)
if np.all(np.isnan(ndvi_pixel_train)):
continue # Skip pixels with only NaNs
d, D = determine_differencing(ndvi_pixel_train, seasonal_period)
order = (p, d, q)
seasonal_order = (P, D, Q, seasonal_period)
model, training_mse, aic = fit_sarimax_model(
ndvi_pixel_train.values.flatten(), order, seasonal_order
)
# Store d
d_array[x, y] = d
# Store fitted values
fitted_values_array[x, y, :] = model.fittedvalues
# Store training MSE and AIC
train_mse_array[x, y] = training_mse
train_aic_array[x, y] = aic
# Forecast
forecast_steps = 93 # Fixed forecast steps to 93
forecast = model.get_forecast(steps=forecast_steps).predicted_mean
# Store predictions
predictions[x, y, :] = forecast
# Test data for the same pixel
ndvi_pixel_test = ndvi_data_test.isel(x=x, y=y)
test_time_index = pd.date_range(
start="2021-07-03", periods=len(ndvi_pixel_test.time), freq="5D"
)
test_series = pd.Series(
ndvi_pixel_test.values.flatten(), index=test_time_index
)
# Create a time index for the forecast data
original_time_index = pd.date_range(
start="2017-07-04", periods=len(ndvi_pixel_train.time), freq="5D"
)
forecast_time_index = pd.date_range(
start=original_time_index[-1] + pd.DateOffset(days=5),
periods=forecast_steps,
freq="5D",
)
forecast_series = pd.Series(forecast, index=forecast_time_index)
# Ensure the forecast and test data have the same time dimension
common_indices = forecast_series.index.intersection(test_series.index)
# Subset forecast and test series to only include the common indices
aligned_forecast_series = forecast_series.loc[common_indices]
aligned_test_series = test_series.loc[common_indices]
# Drop NaNs from both series
aligned_forecast_series = aligned_forecast_series.dropna()
aligned_test_series = aligned_test_series.dropna()
# Align the series again to ensure matching lengths after dropping NaNs
final_common_indices = aligned_forecast_series.index.intersection(
aligned_test_series.index
)
aligned_forecast_series = aligned_forecast_series.loc[
final_common_indices
]
aligned_test_series = aligned_test_series.loc[final_common_indices]
# Ensure lengths match after alignment
if len(aligned_forecast_series) != len(aligned_test_series):
raise ValueError(
"Aligned forecast and test series have different lengths after dropping NaNs"
)
# Calculate test MSE
test_mse = mean_squared_error(
aligned_test_series, aligned_forecast_series
)
test_mse_array[x, y] = test_mse
# print(f" Pixel ({x}, {y}): d={d}, D={D}, Training MSE: {training_mse}, AIC: {aic}, Test MSE: {test_mse}")
except Exception as e:
print(
f" Failed to process pixel ({x}, {y}) in file {nc_file_train}: {e}"
)
continue
# Ensure output directories exist
fitted_output_dir = base_dir / "data" / "data_fitted"
predicted_output_dir = base_dir / "data" / "data_predictions"
os.makedirs(fitted_output_dir, exist_ok=True)
os.makedirs(predicted_output_dir, exist_ok=True)
# Save fitted values and training MSEs/AICs to a new NetCDF file
fitted_output_ds = xr.Dataset(
{
"fitted_values": (["x", "y", "time"], fitted_values_array),
"train_mse": (["x", "y"], train_mse_array),
"train_aic": (["x", "y"], train_aic_array),
"d": (["x", "y"], d_array),
},
coords={
"time": ndvi_data_train.coords["time"],
"x": ndvi_data_train.coords["x"],
"y": ndvi_data_train.coords["y"],
},
)
fitted_output_file = os.path.join(
fitted_output_dir, f'SARIMA_fitted_{nc_file_train.split(".")[0]}.nc'
)
fitted_output_ds.to_netcdf(fitted_output_file)
print(f"Fitted values saved to '{fitted_output_file}'")
# Save predictions and test MSEs to a new NetCDF file
prediction_output_ds = xr.Dataset(
{
"predictions": (["x", "y", "time"], predictions),
"test_mse": (["x", "y"], test_mse_array),
},
coords={
"time": pd.date_range(start="2021-07-03", periods=93, freq="5D"),
"x": ndvi_data_train.coords["x"],
"y": ndvi_data_train.coords["y"],
},
)
prediction_output_file = os.path.join(
predicted_output_dir, f'SARIMA_predicted_{nc_file_train.split(".")[0]}.nc'
)
prediction_output_ds.to_netcdf(prediction_output_file)
print(f"Predictions saved to '{prediction_output_file}'")Processing file ds_B_Cube_665_train.nc
KeyboardInterrupt: